// Copyright 2023 AntGroup CO., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

use std::{
    collections::VecDeque,
    iter::Peekable,
    rc::Rc,
    sync::{Arc, RwLock},
    time::{Duration, Instant},
};

use log_util::info;
use tracing::{span, Level};

use crate::{
    api::{
        filter,
        filter::{filter_graph_data_by_ttl, StoreFilterPushdown},
        graph::graph_info_util::is_vertex,
    },
    common::GraphData,
    config::CStoreConfig,
    context::EngineContext,
    engine::{
        dict::{
            id_dict::{IdDict, RockDbIdDict},
            label_dict::LabelDict,
        },
        sequence_id::SequenceId,
    },
    error_and_panic,
    index::csr_index::CsrIndex,
    iterator::{
        scan_data_iterator::ScanDataMergeIterator,
        scan_data_iterator_with_edge_limit::ScanDataEdgeLimitedIterator,
        search_data_iterator::SearchDataIterator, segment_iterator::SegmentIter,
    },
    log_util,
    log_util::trace,
    lsm::{
        compact_manager::CompactManager, level_controller::LevelController, lsm_freezer::LsmFreezer,
    },
    manifest::VersionController,
    metric::MetricUpdater,
    metric_recorder, s_info,
    segment::graph_segment::GraphSegment,
    TableType,
};

pub struct Engine {
    cstore_config: CStoreConfig,

    pub engine_context: Arc<EngineContext>,

    segment: RwLock<GraphSegment>,

    index: Arc<CsrIndex>,

    level_controller: Arc<LevelController>,

    compact_manager: CompactManager,

    version_controller: VersionController,

    pub id_dict: RockDbIdDict,

    pub label_dict: LabelDict,

    sequence_id_container: SequenceId,
}

pub trait ActionTrait {
    fn archive(&mut self, version: u64);

    fn recover(&mut self, version: u64);

    fn compact(&mut self);
}

impl Engine {
    /// Create new engine instance.
    pub fn new(cstore_config: CStoreConfig) -> Self {
        // Init logger.
        if cstore_config.log.enable_log {
            log_util::try_init(
                cstore_config.log.log_type,
                cstore_config.log.log_level,
                cstore_config.store.shard_index,
            );
        }
        let _span = span!(
            Level::DEBUG,
            "engine.new",
            trace_id = log_util::generate_trace_id(&cstore_config)
        )
        .entered();

        metric_recorder::try_init(&cstore_config.metric);

        let segment = RwLock::new(GraphSegment::new(&cstore_config.store));
        let index = Arc::new(CsrIndex::new(&cstore_config.index));
        let label_dict = LabelDict::default();
        let engine_context = Arc::new(EngineContext::new(&label_dict, &cstore_config));

        let index_updater: Arc<dyn MetricUpdater> = Arc::clone(&index) as _;

        engine_context.metric_collector.register(index_updater);

        let level_controller = Arc::new(LevelController::new(
            &cstore_config.store,
            Arc::clone(&engine_context.file_operator),
        ));

        let id_dict = RockDbIdDict::new(
            &cstore_config.store.base.local_name_space,
            &cstore_config.dict.id_dict_key,
        );

        // start compact service.
        let mut compact_manager = CompactManager::new(
            Arc::clone(&cstore_config.store),
            Arc::clone(&cstore_config.table),
            Arc::clone(&engine_context),
            Arc::clone(&index),
            Arc::clone(&level_controller),
            Arc::clone(&engine_context.file_operator),
        );

        compact_manager.start_compact_service();

        let version_controller = VersionController::new(
            &cstore_config.manifest,
            &cstore_config.table,
            &cstore_config.dict,
            Arc::clone(&engine_context.file_operator),
        );

        info!("new store engine");

        Engine {
            cstore_config,
            segment,
            index: Arc::clone(&index),
            level_controller: Arc::clone(&level_controller),
            engine_context: Arc::clone(&engine_context),
            compact_manager,
            version_controller,
            id_dict,
            label_dict,
            sequence_id_container: SequenceId::default(),
        }
    }

    /// Put serialized vertex or edge to segment.
    /// The key is incremental ID, which was generated by ID dict.
    pub fn put(&mut self, key: u32, mut value: GraphData) {
        let _span = span!(
            Level::DEBUG,
            "engine.put",
            trace_id = log_util::generate_trace_id(&self.cstore_config)
        )
        .entered();
        // set sequence id to graph data.
        value.second_key.sequence_id = self.sequence_id_container.get_next();

        let mut segment_with_guard = self.segment.write().unwrap();

        segment_with_guard.put(key, value);
        if segment_with_guard.need_freeze() {
            self._flush(SegmentIter::new(
                segment_with_guard.view(self.engine_context.as_ref()),
            ));

            segment_with_guard.close();
        }
    }

    /// Get serialized vertex or edge list with filter pushdown from segment and
    /// file. The key is incremental id, which was generated by ID dict.
    pub fn get(&self, key: u32, filter_pushdown: &StoreFilterPushdown) -> Option<Vec<GraphData>> {
        let _span = span!(
            Level::DEBUG,
            "engine.get",
            trace_id = log_util::generate_trace_id(&self.cstore_config)
        )
        .entered();

        let mut graph_data_vec: Vec<GraphData> = vec![];
        let mut partitioned_graph_data_vec = vec![];

        // 1. read from segment.
        {
            let segment_with_guard = self.segment.read().unwrap();
            let option = segment_with_guard.get(key, &self.engine_context);
            if let Some(mem_graph_data_vec) = option {
                let mut new_mem_graph_data_vec = VecDeque::new();
                for graph_data in mem_graph_data_vec {
                    if filter::filter_graph_info(
                        &filter_pushdown.filter_handler,
                        graph_data.second_key.graph_info,
                    ) && filter_graph_data_by_ttl(&self.engine_context.ttl_dict, &graph_data)
                    {
                        new_mem_graph_data_vec.push_back(graph_data);
                    }
                }
                partitioned_graph_data_vec.push(new_mem_graph_data_vec);
            }
        }

        // 2. read by index + lsm.
        // 2.1. read index.
        let start_index = Instant::now();
        let graph_data_index_option = self
            .index
            .create_index_searcher(&self.cstore_config.table, &self.engine_context)
            .get(key, self.level_controller.as_ref(), filter_pushdown);
        trace!(
            "index cost {}us",
            (Instant::now() - start_index).as_micros()
        );

        // 2.2. read value.
        let start_search_value = Instant::now();
        if let Some(graph_data_index_vec) = graph_data_index_option {
            let res = self
                .engine_context
                .table_searcher
                .try_search_and_attach_property_in_parallel(
                    &self.cstore_config.table,
                    &self.engine_context,
                    graph_data_index_vec,
                    filter_pushdown,
                );

            for fid_to_graph_data_vec in Arc::try_unwrap(res).unwrap().into_iter() {
                partitioned_graph_data_vec.push(fid_to_graph_data_vec.1);
            }
        }
        trace!(
            "search value cost {}us",
            (Instant::now() - start_search_value).as_micros()
        );

        // 3 merge graph data.
        let start_merge_data = Instant::now();
        let search_data_iterator = SearchDataIterator::new(
            partitioned_graph_data_vec,
            self.engine_context.sort_field.as_slice(),
        );
        trace!(
            "merge graph data cost {}us",
            (Instant::now() - start_merge_data).as_micros()
        );

        // 4. other filter:
        // 4.1 keep first vertex.
        // 4.2 delete duplicate edge.
        // 4.3 check must contain vertex.
        let mut has_vertex = false;
        for graph_data in search_data_iterator {
            let is_vertex = is_vertex(graph_data.second_key.graph_info);
            if is_vertex {
                if has_vertex {
                    continue;
                }
                has_vertex = true;
            } else {
                let pre_graph_data_opt = graph_data_vec.last();
                if let Some(pre_graph_data) = pre_graph_data_opt {
                    if graph_data.second_key.graph_info == pre_graph_data.second_key.graph_info
                        && graph_data
                            .second_key
                            .target_id
                            .eq(&pre_graph_data.second_key.target_id)
                    {
                        continue;
                    }
                }
            }
            graph_data_vec.push(graph_data);
        }

        // check must contains vertex.
        if filter_pushdown.filter_context.vertex_must_contains && !has_vertex {
            return None;
        }

        trace!("success to get key {}", key);
        if graph_data_vec.is_empty() {
            None
        } else {
            Some(graph_data_vec)
        }
    }

    /// Scan the whole graph by ID order, return vertex and edge iterator.
    pub fn scan(
        &self,
        filter_pushdown: Rc<StoreFilterPushdown>,
    ) -> Peekable<ScanDataEdgeLimitedIterator> {
        let _span = span!(
            Level::DEBUG,
            "engine.scan",
            trace_id = log_util::generate_trace_id(&self.cstore_config)
        )
        .entered();

        // flush segment to disk before scan, to avoid conflict with put operation.
        self.flush();

        let level_size = self.level_controller.get_level_sizes();

        let mut multi_sorted_table_ids = vec![];

        // avoid conflict with compaction.
        {
            let _lock = self.level_controller.scan_lock_guard();

            self.level_controller
                .get_level_handler(0)
                .read()
                .unwrap()
                .get_table_info_map()
                .iter()
                .map(|tuple| tuple.0)
                .for_each(|fid| multi_sorted_table_ids.push(vec![*fid]));

            for level in 1..level_size.len() {
                let mut sorted_table_ids = vec![];
                self.level_controller
                    .get_level_handler(level)
                    .read()
                    .unwrap()
                    .get_table_info_map()
                    .iter()
                    .map(|tuple| tuple.0)
                    .for_each(|fid| sorted_table_ids.push(*fid));
                multi_sorted_table_ids.push(sorted_table_ids);
            }
        }

        let scan_data_merge_iterator = ScanDataEdgeLimitedIterator::new(
            ScanDataMergeIterator::new(
                multi_sorted_table_ids,
                self.engine_context.as_ref(),
                &self.cstore_config.table,
                Rc::clone(&filter_pushdown),
                true,
            ),
            filter_pushdown,
        );

        scan_data_merge_iterator.peekable()
    }

    /// Flush data in segment to file.
    pub fn flush(&self) {
        let _span = span!(
            Level::DEBUG,
            "engine.flush",
            trace_id = log_util::generate_trace_id(&self.cstore_config)
        )
        .entered();

        let mut segment_with_guard = self.segment.write().unwrap();

        if !segment_with_guard.is_empty() {
            self._flush(SegmentIter::new(
                segment_with_guard.view(self.engine_context.as_ref()),
            ));

            segment_with_guard.close();
        }
        info!("finished to flush store.");
    }

    /// Register src_id to ID dict and return incremental ID.
    pub fn register(&self, key: &[u8]) -> u32 {
        self.id_dict.register(key)
    }

    /// Get ID from ID dict by src_id.
    pub fn get_id(&self, key: &[u8]) -> Option<u32> {
        self.id_dict.get_id(key)
    }

    fn _flush(&self, segment_iterator: SegmentIter) {
        // Limit the amount of data written to disk.
        loop {
            let files_num_in_level_0 = self
                .level_controller
                .get_level_handler(0)
                .read()
                .unwrap()
                .get_table_info_map()
                .len();

            if files_num_in_level_0 > (self.cstore_config.store.level0_file_num * 2) {
                std::thread::sleep(Duration::from_secs(
                    self.cstore_config.store.compactor_interval,
                ));

                // Check the state of compact threads.
                if self.compact_manager.get_stop_state() {
                    error_and_panic!("compact error, please check log");
                }

                s_info!(
                    self.cstore_config.shard_index,
                    "sleep {}s to limit the data written to disk, \
                    because the number of files which has not yet been compacted in level 0 is {}",
                    self.cstore_config.store.compactor_interval,
                    files_num_in_level_0
                );
            } else {
                break;
            }
        }

        // Create data table.
        let mut table = self
            .level_controller
            .create_new_table(&self.cstore_config.table, TableType::Vs)
            .unwrap_or_else(|e| {
                error_and_panic!(
                    "error occurred in building property table in writing: {}",
                    e
                );
            });

        // Create index builder.
        let mut index_builder = self
            .index
            .create_index_builder(&self.level_controller, &self.cstore_config.table, 0)
            .unwrap_or_else(|e| {
                error_and_panic!("error occurred in building index table in writing: {}", e);
            });

        LsmFreezer::freeze_and_flush_data(
            self.cstore_config.store.as_ref(),
            segment_iterator,
            &mut index_builder,
            &mut table,
        )
        .unwrap_or_else(|e| {
            error_and_panic!(
                "error occurred in flushing property table in writing: {}",
                e
            );
        });

        // close table after use.
        table.close();

        // finish building index.
        let new_table_info = index_builder
            .flush(table.get_fid(), table.get_table_offset())
            .unwrap_or_else(|e| {
                error_and_panic!("error occurred in flushing index table in writing: {}", e)
            });

        // register to lsm tree.
        self.level_controller.register_to_lsm(&[new_table_info], 0);
    }

    /// Release memory in cstore and close it.
    pub fn close(&mut self) {
        self.segment.write().unwrap().close();
        self.index.close();
        self.level_controller.close();
        self.version_controller.close();
        self.compact_manager.close();
        self.id_dict.drop_local();
        self.engine_context.metric_collector.close();
        info!("finished to close store.");
    }
}

impl ActionTrait for Engine {
    fn archive(&mut self, version: u64) {
        let _span = span!(
            Level::DEBUG,
            "engine.archive",
            trace_id = log_util::generate_trace_id(&self.cstore_config)
        )
        .entered();
        s_info!(
            self.cstore_config.base.shard_index,
            "start to archive to version {}",
            version
        );

        self.version_controller.archive(
            version,
            self.sequence_id_container.archive(),
            &self.index,
            &self.level_controller,
            &self.engine_context.cache_handler,
            &self.id_dict,
            &self.label_dict,
            self.compact_manager.get_compact_drop_lock(),
        );

        s_info!(
            self.cstore_config.base.shard_index,
            "success to archive to version {}, time cost info: {}",
            version,
            &self.version_controller.vc_log_info.archive_time_cost
        );
    }

    fn recover(&mut self, version: u64) {
        let _span = span!(
            Level::DEBUG,
            "engine.recover",
            trace_id = log_util::generate_trace_id(&self.cstore_config)
        )
        .entered();
        let _lock_guard = self.level_controller.recover_write_guard();

        s_info!(
            self.cstore_config.base.shard_index,
            "start to recover from version {}",
            version
        );

        self.version_controller.recover(
            version,
            &self.index,
            &self.level_controller,
            &self.id_dict,
            &self.label_dict,
            &self.sequence_id_container,
        );

        // renew engine context when label dict is recovered.
        self.engine_context = Arc::new(EngineContext::new(&self.label_dict, &self.cstore_config));

        s_info!(
            self.cstore_config.base.shard_index,
            "success to recover from version {}, time cost info: {}",
            version,
            &self.version_controller.vc_log_info.load_time_cost
        );
    }

    fn compact(&mut self) {
        // TODO: need implement manual compact.
    }
}

#[cfg(test)]
mod tests {
    use std::{thread::sleep, time::Duration};

    use itertools::Itertools;
    use rand::{seq::SliceRandom, Rng, SeedableRng};
    use rand_isaac::IsaacRng;
    use rustc_hash::FxHashSet;

    use super::*;
    #[cfg(feature = "hdfs")]
    use crate::test_util::build_hdfs_test_config;
    #[cfg(feature = "opendal")]
    use crate::test_util::build_oss_test_config;
    use crate::{
        api::graph::graph_serde::GraphSerde,
        log_util::{debug, LogLevel, LogType},
        test_util::{bind_job_name_with_ts, delete_fo_test_dir, delete_test_dir, GraphDataBuilder},
        CStoreConfigBuilder, ConfigMap,
    };

    const REMOTE_ITERATIONS: u32 = 100;
    const LOCAL_ITERATIONS: u32 = 20000;

    const COMPACT_KEY_RANGE: u32 = 10000;
    const COMPACT_ITERATIONS: u32 = 100000;

    const LOCATION_TYPE: [&str; 2] = ["Local", "Remote"];

    fn test_cstore(config_map: &ConfigMap, iterations: u32) {
        log_util::try_init(LogType::ConsoleAndFile, LogLevel::Debug, 0);
        let location = LOCATION_TYPE.choose(&mut rand::thread_rng()).unwrap();

        let cstore_config = CStoreConfigBuilder::default()
            .set_with_map(config_map)
            .set("store.location", *location)
            .build();

        debug!(
            "location type {}, persistent type {}",
            cstore_config.store.location, cstore_config.persistent.persistent_type
        );

        let table_config = Arc::clone(&cstore_config.table);
        let manifest_config = cstore_config.manifest.clone();
        let persistent_config = cstore_config.persistent.clone();

        let graph_serde = GraphSerde::default();
        let label_dict = Arc::new(LabelDict::default());

        let mut engine = Engine::new(cstore_config);

        for i in 1..iterations {
            engine.put(
                i,
                GraphDataBuilder::build_edge_graph_data(i, &graph_serde, &label_dict),
            );
            engine.put(
                i,
                GraphDataBuilder::build_edge_graph_data(i, &graph_serde, &label_dict),
            );
            engine.put(
                i,
                GraphDataBuilder::build_edge_graph_data(i, &graph_serde, &label_dict),
            );
            engine.put(
                i,
                GraphDataBuilder::build_vertex_graph_data(i, &graph_serde, &label_dict),
            );
            engine.put(
                i,
                GraphDataBuilder::build_vertex_graph_data(i, &graph_serde, &label_dict),
            );
            engine.put(
                i,
                GraphDataBuilder::build_vertex_graph_data(i, &graph_serde, &label_dict),
            );
            if i % (iterations / 50) == 0 {
                engine.flush();
            }
            if i % (iterations / 5) == 0 {
                for j in 1..i {
                    GraphDataBuilder::get_and_assert_data(&engine, j);
                }
            }
        }
        engine.flush();

        engine.archive(1);

        engine.recover(1);

        for i in 1..iterations {
            GraphDataBuilder::get_and_assert_data(&engine, i);
        }

        sleep(Duration::from_secs(3));

        engine.archive(1);

        engine.recover(1);

        for i in 1..iterations {
            GraphDataBuilder::get_and_assert_data(&engine, i);
        }

        engine.archive(2);

        engine.archive(3);

        engine.close();

        engine = Engine::new(
            CStoreConfigBuilder::default()
                .set_with_map(config_map)
                .build(),
        );

        engine.recover(2);

        for i in 1..iterations {
            GraphDataBuilder::get_and_assert_data(&engine, i);
        }

        engine.close();

        engine = Engine::new(
            CStoreConfigBuilder::default()
                .set_with_map(config_map)
                .build(),
        );

        engine.recover(3);

        for i in 1..iterations {
            GraphDataBuilder::get_and_assert_data(&engine, i);
        }

        engine.close();

        delete_test_dir(&table_config, &persistent_config);
        delete_fo_test_dir(&manifest_config, &persistent_config);
    }

    #[test]
    fn test_engine_local() {
        let mut config_map = ConfigMap::default();

        let job_name = bind_job_name_with_ts("test_engine_local");
        config_map.insert("store.job_name", job_name.as_str());

        test_cstore(&config_map, LOCAL_ITERATIONS);
    }

    #[cfg(feature = "opendal")]
    #[test]
    fn test_engine_oss() {
        let mut config_map = ConfigMap::default();
        let job_name = bind_job_name_with_ts("test_engine_oss");
        config_map.insert("store.job_name", job_name.as_str());

        build_oss_test_config(&mut config_map);

        test_cstore(&config_map, REMOTE_ITERATIONS);
    }

    #[cfg(feature = "hdfs")]
    #[test]
    fn test_engine_hdfs() {
        let mut config_map = ConfigMap::default();
        let job_name = bind_job_name_with_ts("test_engine_hdfs");
        config_map.insert("store.job_name", job_name.as_str());

        build_hdfs_test_config(&mut config_map);

        test_cstore(&config_map, REMOTE_ITERATIONS);
    }

    #[test]
    fn test_engine_compact_local() {
        log_util::try_init(LogType::ConsoleAndFile, LogLevel::Debug, 0);

        let mut config_map = ConfigMap::default();

        let job_name = bind_job_name_with_ts("test_engine_compact_local");
        config_map.insert("store.job_name", job_name.as_str());
        config_map.insert("store.mem_segment_size", "5120");
        config_map.insert("store.level1_file_size", "16384");
        config_map.insert("store.file_size_multiplier", "4");
        config_map.insert("store.level0_file_num", "4");
        config_map.insert("index.index_granularity", "128");

        config_map.insert("store.compact_thread_num", "8");
        config_map.insert("store.max_buffer_size_in_segment", "1048");
        config_map.insert("store.compactor_interval", "1");
        config_map.insert("table.block_size", "512");
        config_map.insert("store.compactor_gc_ratio", "1.2");

        let cstore_config = CStoreConfigBuilder::default()
            .set_with_map(&config_map)
            .build();

        let table_config = Arc::clone(&cstore_config.table);
        let manifest_config = cstore_config.manifest.clone();
        let persistent_config = cstore_config.persistent.clone();

        let graph_serde = GraphSerde::default();
        let label_dict = Arc::new(LabelDict::default());

        debug!("config {:?}", &cstore_config.store);
        let mut engine = Engine::new(cstore_config);
        let sleep_interval = 60000000 / COMPACT_ITERATIONS as u64;

        let mut rng: IsaacRng = SeedableRng::from_entropy();

        let test_key_vec = (0..COMPACT_ITERATIONS)
            .map(|_| rng.gen_range(1..=COMPACT_KEY_RANGE))
            .collect_vec();
        let mut write_key_vec = vec![];

        for (index, i) in test_key_vec.iter().enumerate() {
            engine.put(
                *i,
                GraphDataBuilder::build_edge_graph_data(*i, &graph_serde, &label_dict),
            );
            engine.put(
                *i,
                GraphDataBuilder::build_vertex_graph_data(*i, &graph_serde, &label_dict),
            );
            write_key_vec.push(*i);

            if index != 0 && index as u32 % (COMPACT_ITERATIONS / 100) == 0 {
                for _j in 1..100 {
                    let read_key = *write_key_vec.choose(&mut rand::thread_rng()).unwrap();

                    GraphDataBuilder::get_and_assert_data(&engine, read_key);
                }

                debug!(
                    "compact test, get and assert data passed, iterations {}",
                    index
                );
            }

            sleep(Duration::from_micros(sleep_interval));
        }
        engine.flush();

        let test_read_set: FxHashSet<u32> = test_key_vec.into_iter().collect();
        let mut test_read_vec = test_read_set.into_iter().collect_vec();
        test_read_vec.sort();

        for i in test_read_vec {
            GraphDataBuilder::get_and_assert_data(&engine, i);
        }

        sleep(Duration::from_secs(120));

        engine.close();

        delete_test_dir(&table_config, &persistent_config);
        delete_fo_test_dir(&manifest_config, &persistent_config);
    }
}
